Bandwidth comparison

Post a reply

Confirmation code
Enter the code exactly as it appears. All letters are case insensitive.
Smilies
:D :) ;) :( :o :shock: :? 8-) :lol: :x :P :oops: :cry: :evil: :twisted: :roll: :!: :?: :idea: :arrow: :| :mrgreen: :geek: :ugeek:
BBCode is ON
[img] is ON
[flash] is OFF
[url] is ON
Smilies are ON
Topic review
   

Expand view Topic review: Bandwidth comparison

Re: Bandwidth comparison

Post by support »

Hello,

First: The standard API of read() and write() is that these functions may execute less bytes than requested, and it's the caller's duty to check their return value for the number of bytes actually read or written. You don't do that on the code listed above, which is probably the cause for your problems.

Please refer to sections 3.2 and 3.3 of the Xillybus host application programming guide for Linux for best practices:

http://xillybus.com/downloads/doc/xilly ... _linux.pdf

As for my suggestion on how to use fifo.c for an data integrity check, I've already answered above: Don't.

Regards,
Eli

Re: Bandwidth comparison

Post by kevin »

Hi,

Let me summarize my questions:

1) Could you advise how to check data integrity with ~/xillybus/demoapps/fifo.c ?

2) I am confused regarding the design decision of using a single shared circular FIFO with semaphore mechanism INSTEAD OF using two separate non-circular FIFO ?

Re: Bandwidth comparison

Post by kevin »

Hi, one other question.

For https://gist.github.com/anonymous/d02c83d612a6438ba29c39e6affc07ff , if I want to check data integrity in the case of loopback using the multithreaded fifo.c , how would I do so given that threads are not scheduled in any fixed order ?

line 362:
Code: Select all
if( buf[fifo->read_position] != buf[fifo->write_position] )


Code: Select all
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

/*********************************************************************
 *                                                                   *
 *                 D E C L A R A T I O N S                           *
 *                                                                   *
 *********************************************************************/

struct xillyfifo {
  unsigned long read_total;
  unsigned long write_total;
  unsigned int bytes_in_fifo;
  unsigned int read_position;
  unsigned int write_position;
  unsigned int size;
  unsigned int done;
  unsigned char *baseaddr;
  sem_t write_sem;
  sem_t read_sem;   
};

struct xillyinfo {
  int slept;
  int bytes;
  int position;
  void *addr;
};

#define FIFO_BACKOFF 0
static int read_fd = 0;
static int write_fd = 1;

/*********************************************************************
 *                                                                   *
 *                 A P I   F U N C T I O N S                         *
 *                                                                   *
 *********************************************************************/

// IMPORTANT:
// =========
//
// NEITHER of the fifo_* functions is reentrant. Only one thread should have
// access to any set of them. This is pretty straightforward when one thread
// writes and one thread reads from the FIFO.
//
// Also make sure that fifo_drained() and fifo_wrote() are NEVER called with
// req_bytes larger than what their request-counterparts RETURNED, or
// things will go crazy pretty soon.


int fifo_init(struct xillyfifo *fifo,
         unsigned int size) {

  fifo->baseaddr = NULL;
  fifo->size = 0;
  fifo->bytes_in_fifo = 0;
  fifo->read_position = 0;
  fifo->write_position = 0;
  fifo->read_total = 0;
  fifo->write_total = 0;
  fifo->done = 0;

  if (sem_init(&fifo->read_sem, 0, 0) == -1)
    return -1; // Fail!

  if (sem_init(&fifo->write_sem, 0, 1) == -1)
    return -1;
 
  fifo->baseaddr = malloc(size);

  if (!fifo->baseaddr)
    return -1;

  if (mlock(fifo->baseaddr, size)) {
    unsigned int i;
    unsigned char *buf = fifo->baseaddr;

    fprintf(stderr, "Warning: Failed to lock RAM, so FIFO's memory may swap to disk.\n"
       "(You may want to use ulimit -l)\n");

    // Write something every 1024 bytes (4096 should be OK, actually).
    // Hopefully all pages are in real RAM after this. Better than nothing.

    for (i=0; i<size; i+=1024)
      buf[i] = 0;
  }

  fifo->size = size;

  return 0; // Success
}

void fifo_done(struct xillyfifo *fifo) {
  fifo->done = 1;
  sem_post(&fifo->read_sem);
  sem_post(&fifo->write_sem);
}

void fifo_destroy(struct xillyfifo *fifo) {
  if (!fifo->baseaddr)
    return; // Better safe than SEGV

  munlock(fifo->baseaddr, fifo->size);
  free(fifo->baseaddr);
 
  sem_destroy(&fifo->read_sem);
  sem_destroy(&fifo->write_sem);

  fifo->baseaddr = NULL;
}

int fifo_request_drain(struct xillyfifo *fifo,
             struct xillyinfo *info) {
  int taken = 0;
  unsigned int now_bytes, max_bytes;

  info->slept = 0;
  info->addr = NULL;

  now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);

  while (now_bytes == 0) {
    if (fifo->done)
      goto fail; // FIFO will not be used by other side, and is empty

    // fifo_wrote() updates bytes_in_fifo and then increments semaphore,
    // so there's no chance for oversleeping. On the other hand, it's
    // possible that the data was drained between the bytes_in_fifo
    // update and the semaphore increment, leading to a false wakeup.
    // That's why we're in a while loop ( + other race conditions).
   
    info->slept = 1;

    if (sem_wait(&fifo->read_sem) && (errno != EINTR))
      goto fail;

    now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);
  }

  max_bytes = fifo->size - fifo->read_position;
  taken = (now_bytes < max_bytes) ? now_bytes : max_bytes;
  info->addr = fifo->baseaddr + fifo->read_position;

 fail:
  info->bytes = taken;
  info->position = fifo->read_position;

  return taken;
}

void fifo_drained(struct xillyfifo *fifo,
       unsigned int req_bytes) {

  int semval;

  if (req_bytes == 0)
    return;

  __sync_sub_and_fetch(&fifo->bytes_in_fifo, req_bytes);
  __sync_add_and_fetch(&fifo->read_total, req_bytes);
 
  fifo->read_position += req_bytes;

  if (fifo->read_position >= fifo->size)
    fifo->read_position -= fifo->size;

  if (sem_getvalue(&fifo->write_sem, &semval))
    semval = 1; // This fallback should never happen

  // Don't increment the semaphore if it's nonzero anyhow. The possible
  // race condition between reading and possibly incrementing has no effect.

  if (semval == 0)
    sem_post(&fifo->write_sem);
}

int fifo_request_write(struct xillyfifo *fifo,
             struct xillyinfo *info) {
  int taken = 0;
  unsigned int now_bytes, max_bytes;

  info->slept = 0;
  info->addr = NULL;

  now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);

  if (fifo->done)
    goto fail; // No point filling an abandoned FIFO

  while (now_bytes >= (fifo->size - FIFO_BACKOFF)) {
    // fifo_drained() updates bytes_in_fifo and then increments semaphore,
    // so there's no chance for oversleeping. On the other hand, it's
    // possible that the data was drained between the bytes_in_fifo
    // update and the semaphore increment, leading to a false wakeup.
    // That's why we're in a while loop ( + other race conditions).

    info->slept = 1;

    if (sem_wait(&fifo->write_sem) && (errno != EINTR))
      goto fail;
 
    if (fifo->done)
      goto fail; // No point filling an abandoned FIFO

    now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);
  }

  taken = fifo->size - (now_bytes + FIFO_BACKOFF);

  max_bytes = fifo->size - fifo->write_position;

  if (taken > max_bytes)
    taken = max_bytes;
  info->addr = fifo->baseaddr + fifo->write_position;

 fail:
  info->bytes = taken;
  info->position = fifo->write_position;

  return taken;
}

void fifo_wrote(struct xillyfifo *fifo,
       unsigned int req_bytes) {

  int semval;

  if (req_bytes == 0)
    return;

  __sync_add_and_fetch(&fifo->bytes_in_fifo, req_bytes);
  __sync_add_and_fetch(&fifo->write_total, req_bytes);
 
  fifo->write_position += req_bytes;
 
  if (fifo->write_position >= fifo->size)
    fifo->write_position -= fifo->size;
 
  if (sem_getvalue(&fifo->read_sem, &semval))
    semval = 1; // This fallback should never happen

  // Don't increment the semaphore if it's nonzero anyhow. The possible
  // race condition between reading and possibly incrementing has no effect.

  if (semval == 0)
    sem_post(&fifo->read_sem);
}

/*********************************************************************
 *                                                                   *
 *                 A P P L I C A T I O N   C O D E                   *
 *                                                                   *
 *********************************************************************/

// Read from FIFO, write to write_fd (standard output OR /dev/xillybus_write_32)

void *write_thread(void *arg)
{
  struct xillyfifo *fifo = arg;
  int do_bytes, written_bytes;
  struct xillyinfo info;
  unsigned char *buf;

  while (1) {
    do_bytes = fifo_request_drain(fifo, &info);

    if (do_bytes == 0)
      return NULL;

    for (buf = info.addr; do_bytes > 0;
    buf += written_bytes, do_bytes -= written_bytes) {   // here, info.addr refers to read pointer of the FIFO

      written_bytes = write(write_fd, buf, do_bytes);

      if ((written_bytes < 0) && (errno != EINTR)) {
   perror("write() failed");
   return NULL;
      }

      if (written_bytes == 0) {
   fprintf(stderr, "Reached write EOF (?!)\n");
   fifo_done(fifo);
   return NULL;
      }

      if (written_bytes < 0) { // errno is EINTR
   written_bytes = 0;
   continue;
      }
     
      fifo_drained(fifo, written_bytes);
    }
  }
}

// Write to FIFO, read from read_fd (standard output OR /dev/xillybus_read_32)

void *read_thread(void *arg)
{
  struct xillyfifo *fifo = arg;
  int do_bytes, read_bytes;
  struct xillyinfo info;
  unsigned char *buf;

  while (1) {
    do_bytes = fifo_request_write(fifo, &info);

    if (do_bytes == 0)
      return NULL;

    for (buf = info.addr; do_bytes > 0;
    buf += read_bytes, do_bytes -= read_bytes) {   // here, info.addr refers to write pointer of the FIFO

      read_bytes = read(read_fd, buf, do_bytes);

      if ((read_bytes < 0) && (errno != EINTR)) {
   perror("read() failed");
   return NULL;
      }

      if (read_bytes == 0) {
   // Reached EOF. Quit without complaining.
   fifo_done(fifo);
   return NULL;
      }

      if (read_bytes < 0) { // errno is EINTR
   read_bytes = 0;
   continue;
      }
     
      fifo_wrote(fifo, read_bytes);
    }
  }
}


void *status_thread(void *arg) {
  struct xillyfifo *fifo = arg;
 
  while (fifo->done < 2)
  {
    fprintf(stderr, "%9d bytes in FIFO, %12ld read, %12ld written\r",
       __sync_add_and_fetch(&fifo->bytes_in_fifo, 0),
       __sync_add_and_fetch(&fifo->read_total, 0),
       __sync_add_and_fetch(&fifo->write_total, 0)      
       );

    //////////////////////////// Data Integrity Check //////////////////////////////////////
    unsigned char *buf = fifo->baseaddr;
    int success = 1;

    if( buf[fifo->read_position] != buf[fifo->write_position] )
    {
      success = 0;
      //printf("read ID %d :%d \n\r",i,array_hardware[i]);
      printf("Test is unsuccessful!\n\r");
      /*for(i=0; i<N; i++){
      printf("o/p from p1 is %d\n\r",array_hardware[i]);
      }*/
    }
    ////////////////////////////////////////////////////////////////////////////////////////

  }

  return NULL;
}

int main(int argc, char *argv[]) {
  pthread_t tid[3];
  struct xillyfifo fifo;
  unsigned int fifo_size;

  if ((argc != 2) && (argc != 3)) {
    fprintf(stderr, "Usage: %s fifo_size [read-file]\n", argv[0]);
    exit(1);
  }

  fifo_size = atoi(argv[1]);

  if (fifo_size == 0) {
    fprintf(stderr, "Bad fifo_size argument %s\n", argv[1]);
    exit(1);
  }

  if (fifo_init(&fifo, fifo_size)) {
    perror("Failed to init");
    exit(1);
  }
 
  ///////////////////////// Open read and write file descriptors /////////////////////////
  if (argc > 2) {
    read_fd = open(argv[2], O_RDONLY | O_NONBLOCK);  // for loopback, use /dev/stdout
  }
  else {
    read_fd = open("/dev/xillybus_read_32", O_RDONLY | O_NONBLOCK);
  }
  if (read_fd < 0) {
    perror("Failed to open read file");
    exit(1);
  }
  if (argc > 3) {
    write_fd = open(argv[3], O_WRONLY | O_NONBLOCK); // for loopback, use /dev/stdout
  }
  else {
    write_fd = open("/dev/xillybus_write_32", O_WRONLY | O_NONBLOCK);
  }
  if (write_fd < 0) {
    perror("Failed to open write file");
    exit(1);
  }
  ////////////////////////////////////////////////////////////////////////////////////////

  if (pthread_create(&tid[0], NULL, read_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  if (pthread_create(&tid[1], NULL, write_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  if (pthread_create(&tid[2], NULL, status_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  pthread_join(tid[0], NULL);
  pthread_join(tid[1], NULL);

  fifo.done = 2; // This is a hack for the status thread
  pthread_join(tid[2], NULL);

  fifo_destroy(&fifo);

  pthread_exit(NULL);

  return 0;
}

Re: Bandwidth comparison

Post by kevin »

[quote="kevin"]Why does https://gist.github.com/promach/1d57d6dda05ff4a21f93b8c3ccc111f7/bfc710ea85e859f6c6d0415c24f2e1d769edfe72#file-test_lpthread-c-L105 fail to pass the data integrity check under loopback condition ?

I mean for data_size of larger than 1M integers, which is 4M bytes

Note: data_size smaller than 1M integers does not have such data integrity problem.

Re: Bandwidth comparison

Post by kevin »

Why does https://gist.github.com/promach/1d57d6dda05ff4a21f93b8c3ccc111f7/bfc710ea85e859f6c6d0415c24f2e1d769edfe72#file-test_lpthread-c-L105 fail to pass the data integrity check under loopback condition ?

Code: Select all
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>

#define VP void*
#define NTH 2

int fdw32 = 0;
int fdr32 = 0;

int N = 0;
int i;

int *array_input;
int *array_hardware;

struct timeval tv1,tv2;
ssize_t w,e,r;

VP sample_1(VP arg);
VP sample_2(VP arg);

VP sample_1(VP arg) {
   w = write(fdw32, array_input, sizeof(int)*N);
   e = write(fdw32, NULL, 0);
   pthread_exit(NULL);
}

VP sample_2(VP arg) {
        r = read(fdr32, array_hardware, sizeof(int)*N);
        pthread_exit(NULL);
}


//int main(void)
int main(int argc, char *argv[])
{
        fdw32 = open("/dev/xillybus_write_32", O_WRONLY);
        fdr32 = open("/dev/xillybus_read_32", O_RDONLY);

   N = atoi(argv[1]);
   //N = N*4;

        if (fdw32<0 || fdr32<0) {
                perror("Failed to open devfiles");
                exit(1);
        }
   
   //printf("sizeof(short) = %d\r\n", sizeof(short));
   //printf("sizeof(int) = %d\r\n", sizeof(int));

        //allocate memory
        array_input = (int*) malloc(N*sizeof(int));
        array_hardware = (int*) malloc(N*sizeof(int));

        // generate inputs and prepare outputs
        for(i=0; i<N; i++){
                array_input[i] = i;
                array_hardware[i] = 0;
        }

   pthread_t tid[NTH];
   int loop = 0;
   int value[NTH] = {1,2};
   //printf("\n Going to create threads \n");
   /** Creation of threads*/

/*   for(loop=0; loop<NTH; loop++) {
      pthread_create(&tid[loop], NULL, &sample, &value[loop]);
      printf("\n value of loop = %d\n", loop);
   }
*/
   //gettimeofday(&tv1, NULL);
   pthread_create(&tid[0], NULL, &sample_1, &value[0]);
          //printf("\n value of loop = %d\n", 0);
   
   pthread_create(&tid[1], NULL, &sample_2, &value[1]);
        //printf("\n value of loop = %d\n", 1);

   /** Synch of threads in order to exit normally*/
   gettimeofday(&tv1, NULL);
   for(loop=0; loop<NTH; loop++) {
      pthread_join(tid[loop], NULL);
   }
   gettimeofday(&tv2, NULL);
   printf("%f\n\r", (double)1000000*(tv2.tv_sec-tv1.tv_sec)+(tv2.tv_usec-tv1.tv_usec));

   int success = 1;
   for(i=0;i<N;i++){
      if(array_input[i]!= array_hardware[i]){
         success = 0;
         printf("read ID %d :%d \n\r",i,array_hardware[i]);
         break;
      }
   }
   if (success == 0){
      printf("Test is unsuccessful!\n\r");

      /*for(i=0; i<N; i++){
                   printf("o/p from p1 is %d\n\r",array_hardware[i]);
      }*/
        }
   
   close(fdw32);
   close(fdr32);

   free(array_input);
   free(array_hardware);
      
   return EXIT_SUCCESS;
//   pthread_exit(NULL);
}

Re: Bandwidth comparison

Post by support »

Indeed, you don't. There's a small FIFO on the FPGA, and a large DMA buffer maintained by the host driver.

In Xillybus' guidelines for measuring bandwidth there is no software FIFO mentioned:

http://xillybus.com/doc/bandwidth-guidelines

Regards,
Eli

Re: Bandwidth comparison

Post by kevin »

Why do we need fifo buffering even for C-code ? I thought this FIFO is to be implemented in FPGA application logic ?

Re: Bandwidth comparison

Post by kevin »

Now all sample length except those larger than 1M pass the comparison test. May I know why ?

I have https://paste.ubuntu.com/26346293/ with https://gist.github.com/promach/3751054163042d255f818724934799ed

(gdb) run 1048576
Starting program: /root/phung/dpoverlay/fifo 1048576
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/arm-linux-gnueabihf/libthread_db.so.1".
[New Thread 0xb6bcd470 (LWP 2494)]
[New Thread 0xb63cd470 (LWP 2495)]
[Thread 0xb6bcd470 (LWP 2494) exited]
[Thread 0xb63cd470 (LWP 2495) exited]
[New Thread 0xb5bcd470 (LWP 2496)]
37.000000
[Thread 0xb5bcd470 (LWP 2496) exited] read, 1048576 written

Program received signal SIGSEGV, Segmentation fault.
0x0000945c in main (argc=2, argv=0xbefff7a4) at fifo.c:457
457 if( array_input[i] != array_hardware[i] ){
(gdb) print i
$5 = 880632
(gdb) print array_input[i]
$6 = 41 ')'
(gdb) print array_input[i-1]
$7 = 0 '\000'
(gdb) print array_input[i-2]
$8 = 12 '\f'
(gdb)


Code: Select all
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>

/*********************************************************************
 *                                                                   *
 *                 D E C L A R A T I O N S                           *
 *                                                                   *
 *********************************************************************/

struct xillyfifo {
  unsigned long read_total;
  unsigned long write_total;
  unsigned int bytes_in_fifo;
  unsigned int read_position;
  unsigned int write_position;
  unsigned int size;
  unsigned int done;
  unsigned char *baseaddr;
  sem_t write_sem;
  sem_t read_sem;   
};

struct xillyinfo {
  int slept;
  int bytes;
  int position;
  void *addr;
};

#define FIFO_BACKOFF 0
static int read_fd = 0;
static int write_fd = 1;

// pointers to two buffers
unsigned char *array_input;
unsigned char *array_hardware;
int i;  // to address the indexes of memory data pointed by 'array_input' and 'array_hardware' for initialization and comparison purposes

struct timeval tv1,tv2;  // for computing execution time

/*********************************************************************
 *                                                                   *
 *                 A P I   F U N C T I O N S                         *
 *                                                                   *
 *********************************************************************/

// IMPORTANT:
// =========
//
// NEITHER of the fifo_* functions is reentrant. Only one thread should have
// access to any set of them. This is pretty straightforward when one thread
// writes and one thread reads from the FIFO.
//
// Also make sure that fifo_drained() and fifo_wrote() are NEVER called with
// req_bytes larger than what their request-counterparts RETURNED, or
// things will go crazy pretty soon.


int fifo_init(struct xillyfifo *fifo,
         unsigned int size) {

  fifo->baseaddr = NULL;
  fifo->size = 0;
  fifo->bytes_in_fifo = 0;
  fifo->read_position = 0;
  fifo->write_position = 0;
  fifo->read_total = 0;
  fifo->write_total = 0;
  fifo->done = 0;

  if (sem_init(&fifo->read_sem, 0, 0) == -1)
    return -1; // Fail!

  if (sem_init(&fifo->write_sem, 0, 1) == -1)
    return -1;
 
  fifo->baseaddr = malloc(size);

  if (!fifo->baseaddr)
    return -1;

  if (mlock(fifo->baseaddr, size)) {
    unsigned int i;
    unsigned char *buf = fifo->baseaddr;

    fprintf(stderr, "Warning: Failed to lock RAM, so FIFO's memory may swap to disk.\n"
       "(You may want to use ulimit -l)\n");

    // Write something every 1024 bytes (4096 should be OK, actually).
    // Hopefully all pages are in real RAM after this. Better than nothing.

    for (i=0; i<size; i+=1024)
      buf[i] = 0;
  }

  fifo->size = size;

  return 0; // Success
}

void fifo_done(struct xillyfifo *fifo) {
  fifo->done = 1;
  sem_post(&fifo->read_sem);
  sem_post(&fifo->write_sem);
}

void fifo_destroy(struct xillyfifo *fifo) {
  if (!fifo->baseaddr)
    return; // Better safe than SEGV

  munlock(fifo->baseaddr, fifo->size);
  free(fifo->baseaddr);
 
  sem_destroy(&fifo->read_sem);
  sem_destroy(&fifo->write_sem);

  fifo->baseaddr = NULL;
}

int fifo_request_drain(struct xillyfifo *fifo,
             struct xillyinfo *info) {
  int taken = 0;
  unsigned int now_bytes, max_bytes;

  info->slept = 0;
  info->addr = NULL;

  now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);

  while (now_bytes == 0) {
    if (fifo->done)
      goto fail; // FIFO will not be used by other side, and is empty

    // fifo_wrote() updates bytes_in_fifo and then increments semaphore,
    // so there's no chance for oversleeping. On the other hand, it's
    // possible that the data was drained between the bytes_in_fifo
    // update and the semaphore increment, leading to a false wakeup.
    // That's why we're in a while loop ( + other race conditions).
   
    info->slept = 1;

    if (sem_wait(&fifo->read_sem) && (errno != EINTR))
      goto fail;

    now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);
  }

  max_bytes = fifo->size - fifo->read_position;
  taken = (now_bytes < max_bytes) ? now_bytes : max_bytes;
  info->addr = fifo->baseaddr + fifo->read_position;

 fail:
  info->bytes = taken;
  info->position = fifo->read_position;

  return taken;
}

void fifo_drained(struct xillyfifo *fifo,
       unsigned int req_bytes) {

  int semval;

  if (req_bytes == 0)
    return;

  __sync_sub_and_fetch(&fifo->bytes_in_fifo, req_bytes);
  __sync_add_and_fetch(&fifo->read_total, req_bytes);
 
  fifo->read_position += req_bytes;

  if (fifo->read_position >= fifo->size)
    fifo->read_position -= fifo->size;

  if (sem_getvalue(&fifo->write_sem, &semval))
    semval = 1; // This fallback should never happen

  // Don't increment the semaphore if it's nonzero anyhow. The possible
  // race condition between reading and possibly incrementing has no effect.

  if (semval == 0)
    sem_post(&fifo->write_sem);
}

int fifo_request_write(struct xillyfifo *fifo,
             struct xillyinfo *info) {
  int taken = 0;
  unsigned int now_bytes, max_bytes;

  info->slept = 0;
  info->addr = NULL;

  now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);

  if (fifo->done)
    goto fail; // No point filling an abandoned FIFO

  while (now_bytes >= (fifo->size - FIFO_BACKOFF)) {
    // fifo_drained() updates bytes_in_fifo and then increments semaphore,
    // so there's no chance for oversleeping. On the other hand, it's
    // possible that the data was written between the bytes_in_fifo
    // update and the semaphore increment, leading to a false wakeup.
    // That's why we're in a while loop ( + other race conditions).
    // Two wakeup conditions, 1) FIFO is not full   2)read/drain finishes    After drain finishes, write thread will wakeup.  However before wakeup, FIFO is full again,so this is a false wakeup.  This while loop guarantees write thread continues to sleep when false wakeup occurs.
    info->slept = 1;

    if (sem_wait(&fifo->write_sem) && (errno != EINTR))  // with the while loop and read_sem had not incremented semaphore, write thread goes into sem_wait() and continues to sleep (returns 'taken' as 0).
      goto fail;
 
    if (fifo->done)
      goto fail; // No point filling an abandoned FIFO

    now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);
  }

  taken = fifo->size - (now_bytes + FIFO_BACKOFF);

  max_bytes = fifo->size - fifo->write_position;

  if (taken > max_bytes)
    taken = max_bytes;
  info->addr = fifo->baseaddr + fifo->write_position;

 fail:
  info->bytes = taken;
  info->position = fifo->write_position;

  return taken;
}

void fifo_wrote(struct xillyfifo *fifo,
       unsigned int req_bytes) {

  int semval;

  if (req_bytes == 0)
    return;

  __sync_add_and_fetch(&fifo->bytes_in_fifo, req_bytes);
  __sync_add_and_fetch(&fifo->write_total, req_bytes);
 
  fifo->write_position += req_bytes;
 
  if (fifo->write_position >= fifo->size)
    fifo->write_position -= fifo->size;
 
  if (sem_getvalue(&fifo->read_sem, &semval))
    semval = 1; // This fallback should never happen

  // Don't increment the semaphore if it's nonzero anyhow. The possible
  // race condition between reading and possibly incrementing has no effect.

  if (semval == 0)
    sem_post(&fifo->read_sem);
}

/*********************************************************************
 *                                                                   *
 *                 A P P L I C A T I O N   C O D E                   *
 *                                                                   *
 *********************************************************************/

// Read from FIFO, write to standard output

void *write_thread(void *arg)
{
  struct xillyfifo *fifo = arg;
  int do_bytes, written_bytes;
  struct xillyinfo info;

  do_bytes = fifo_request_drain(fifo, &info);

  while (do_bytes > 0) {
    do_bytes = fifo_request_drain(fifo, &info);

    if (do_bytes == 0)
      return NULL;

    for (array_input = info.addr; do_bytes > 0;
    array_input += written_bytes, do_bytes -= written_bytes) {

      written_bytes = write(write_fd, array_input, do_bytes);

      if ((written_bytes < 0) && (errno != EINTR)) {
   perror("write() failed");
   return NULL;
      }

      if (written_bytes == 0) {
   fprintf(stderr, "Reached write EOF (?!)\n");
   fifo_done(fifo);
   return NULL;
      }

      if (written_bytes < 0) { // errno is EINTR
   written_bytes = 0;
   continue;
      }
     
      fifo_drained(fifo, written_bytes);
    }
  }
}

// Write to FIFO, read from standard output

void *read_thread(void *arg)
{
  struct xillyfifo *fifo = arg;
  int do_bytes, read_bytes;
  struct xillyinfo info;

  do_bytes = fifo_request_write(fifo, &info);

  while (do_bytes > 0) {
    do_bytes = fifo_request_write(fifo, &info);

    if (do_bytes == 0)
      return NULL;

    for (array_hardware = info.addr; do_bytes > 0;
    array_hardware += read_bytes, do_bytes -= read_bytes) {

      read_bytes = read(read_fd, array_hardware, do_bytes);

      if ((read_bytes < 0) && (errno != EINTR)) {
         perror("read() failed");
         return NULL;
      }

      if (read_bytes == 0) {
         // Reached EOF. Quit without complaining.
         fifo_done(fifo);
         return NULL;
      }

      if (read_bytes < 0) { // errno is EINTR
         read_bytes = 0;
         continue;
      }
     
      fifo_wrote(fifo, read_bytes);
    }
  }
}


void *status_thread(void *arg) {
  struct xillyfifo *fifo = arg;
 
  while (fifo->done < 2)
    fprintf(stderr, "%9d bytes in FIFO, %12ld read, %12ld written\r",
       __sync_add_and_fetch(&fifo->bytes_in_fifo, 0),
       __sync_add_and_fetch(&fifo->read_total, 0),
       __sync_add_and_fetch(&fifo->write_total, 0)      
       );
  return NULL;
}

int main(int argc, char *argv[]) {
  pthread_t tid[3];
  struct xillyfifo fifo;
  unsigned int fifo_size;

  if ((argc != 2) && (argc != 3) && (argc != 4)) {
    printf("argc = %d\r\n", argc);
    fprintf(stderr, "Usage: %s fifo_size [read-file]\n", argv[0]);
    exit(1);
  }

  fifo_size = atoi(argv[1]);

  if (fifo_size == 0) {
    fprintf(stderr, "Bad fifo_size argument %s\n", argv[1]);
    exit(1);
  }

  if (fifo_init(&fifo, fifo_size)) {
    perror("Failed to init");
    exit(1);
  }

  //allocate memory
  array_input = (unsigned char*) malloc(fifo_size*sizeof(unsigned char));
  array_hardware = (unsigned char*) malloc(fifo_size*sizeof(unsigned char));

  // generate inputs and prepare outputs
  for(i=0; i<fifo_size; i++){
    array_input[i] = i;
    array_hardware[i] = 0;
  }

  if (argc > 2) {
    read_fd = open(argv[2], O_RDONLY);
  }
  else {
    read_fd = open("/dev/xillybus_read_32", O_RDONLY);
  }

  if (read_fd < 0) {
    perror("Failed to open read file");
    exit(1);
  }

  if (argc > 3) {
    write_fd = open(argv[3], O_WRONLY);
  }
  else {
    write_fd = open("/dev/xillybus_write_32", O_WRONLY);
  }

  if (write_fd < 0) {
    perror("Failed to open write file");
    exit(1);
  }

  if (pthread_create(&tid[0], NULL, read_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  if (pthread_create(&tid[1], NULL, write_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  if (pthread_create(&tid[2], NULL, status_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  /** Synch of threads in order to exit normally*/
  gettimeofday(&tv1, NULL);

  pthread_join(tid[0], NULL);
  pthread_join(tid[1], NULL);

  gettimeofday(&tv2, NULL);
  printf("%f\n\r", (double)1000000*(tv2.tv_sec-tv1.tv_sec)+(tv2.tv_usec-tv1.tv_usec));

  fifo.done = 2; // This is a hack for the status thread
  pthread_join(tid[2], NULL);

  int success = 1;
  for(i=0;i<fifo_size;i++){
    if( array_input[i] != array_hardware[i] ){
      success = 0;
    }
    //printf("read ID %d :%d \n\r",i,array_hardware[i]);
  }
  if (success == 1)
    printf("Test is successful!!\n\r");
  else
    printf("Test is unsuccessful!\n\r");

  fifo_destroy(&fifo);

  pthread_exit(NULL);

  return 0;
}

Re: Bandwidth comparison

Post by Guest »

I have found the reason for significant difference in bandwidth result. It is due to inclusion of pthread_join() for status_thread

By the way, why do we need the "for loop" for the write() function as in https://gist.github.com/promach/3751054163042d255f818724934799ed#file-fifo-c-L291-L292 ?

https://github.com/louislxw/xillybus/blob/master/shen/dpoverlay/test_lpthread.c#L36 does not need the "for loop"

Bandwidth comparison

Post by kevin »

I have two sets of codes that measure the execution time of multithreading.

However, both codes gave me significantly different bandwidth given the same fifo/memory size

May I know if I have modified your fifo.c under xillybus_demoapps incorrectly ?

Multithreading only
https://github.com/louislxw/xillybus/blob/master/shen/dpoverlay/test_lpthread.c
Code: Select all
/** Simple multi-threaded application explains how to create apps with multiple
 * threads and also explains about how to deal with args parameter of pthread_create
 * Owner Manikandan Govindarajan <govi0009@e.ntu.edu.sg>
 */
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>

#define VP void*
#define NTH 2

int fdw32 = 0;
int fdr32 = 0;

int N = 0;
int i;

int *array_input;
int *array_hardware;

struct timeval tv1,tv2;
ssize_t w,e,r;

VP sample_1(VP arg);
VP sample_2(VP arg);

VP sample_1(VP arg) {
   w = write(fdw32, array_input, sizeof(short)*N);
   e = write(fdw32, NULL, 0);
   pthread_exit(NULL);
}

VP sample_2(VP arg) {
        r = read(fdr32, array_hardware, sizeof(short)*N);
        pthread_exit(NULL);
}


//int main(void)
int main(int argc, char *argv[])
{
        fdw32 = open("/dev/xillybus_write_32", O_WRONLY);
        fdr32 = open("/dev/xillybus_read_32", O_RDONLY);

   N = atoi(argv[1]);
   N = N*4;

        if (fdw32<0 || fdr32<0) {
                perror("Failed to open devfiles");
                exit(1);
        }

        //allocate memory
        array_input = (int*) malloc(N*sizeof(int));
        array_hardware = (int*) malloc(N*sizeof(int));

        // generate inputs and prepare outputs
        for(i=0; i<N; i++){
                array_input[i] = i;
                array_hardware[i] = 0;
        }

   pthread_t tid[NTH];
   int loop = 0;
   int value[NTH] = {1,2};
   //printf("\n Going to create threads \n");
   /** Creation of threads*/

/*   for(loop=0; loop<NTH; loop++) {
      pthread_create(&tid[loop], NULL, &sample, &value[loop]);
      printf("\n value of loop = %d\n", loop);
   }
*/
   //gettimeofday(&tv1, NULL);
   pthread_create(&tid[0], NULL, &sample_1, &value[0]);
          //printf("\n value of loop = %d\n", 0);
   
   pthread_create(&tid[1], NULL, &sample_2, &value[1]);
        //printf("\n value of loop = %d\n", 1);

   /** Synch of threads in order to exit normally*/
   gettimeofday(&tv1, NULL);
   for(loop=0; loop<NTH; loop++) {
      pthread_join(tid[loop], NULL);
   }
   gettimeofday(&tv2, NULL);
   printf("%f\n\r", (double)1000000*(tv2.tv_sec-tv1.tv_sec)+(tv2.tv_usec-tv1.tv_usec));

/*   for(i=0; i<N/4; i++){
                printf("o/p from p1 is %d\n\r",array_hardware[i]);
        }
*/   
   close(fdw32);
   close(fdr32);

   free(array_input);
   free(array_hardware);
      
   return EXIT_SUCCESS;
//   pthread_exit(NULL);
}


Multithreading and semaphore for false wakeup
https://gist.github.com/promach/3751054163042d255f818724934799ed
Code: Select all
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>

/*********************************************************************
 *                                                                   *
 *                 D E C L A R A T I O N S                           *
 *                                                                   *
 *********************************************************************/

struct xillyfifo {
  unsigned long read_total;
  unsigned long write_total;
  unsigned int bytes_in_fifo;
  unsigned int read_position;
  unsigned int write_position;
  unsigned int size;
  unsigned int done;
  unsigned char *baseaddr;
  sem_t write_sem;
  sem_t read_sem;   
};

struct xillyinfo {
  int slept;
  int bytes;
  int position;
  void *addr;
};

#define FIFO_BACKOFF 0
static int read_fd = 0;
static int write_fd = 1;

struct timeval tv1,tv2;  // for computing execution time

/*********************************************************************
 *                                                                   *
 *                 A P I   F U N C T I O N S                         *
 *                                                                   *
 *********************************************************************/

// IMPORTANT:
// =========
//
// NEITHER of the fifo_* functions is reentrant. Only one thread should have
// access to any set of them. This is pretty straightforward when one thread
// writes and one thread reads from the FIFO.
//
// Also make sure that fifo_drained() and fifo_wrote() are NEVER called with
// req_bytes larger than what their request-counterparts RETURNED, or
// things will go crazy pretty soon.


int fifo_init(struct xillyfifo *fifo,
         unsigned int size) {

  fifo->baseaddr = NULL;
  fifo->size = 0;
  fifo->bytes_in_fifo = 0;
  fifo->read_position = 0;
  fifo->write_position = 0;
  fifo->read_total = 0;
  fifo->write_total = 0;
  fifo->done = 0;

  if (sem_init(&fifo->read_sem, 0, 0) == -1)
    return -1; // Fail!

  if (sem_init(&fifo->write_sem, 0, 1) == -1)
    return -1;
 
  fifo->baseaddr = malloc(size);

  if (!fifo->baseaddr)
    return -1;

  if (mlock(fifo->baseaddr, size)) {
    unsigned int i;
    unsigned char *buf = fifo->baseaddr;

    fprintf(stderr, "Warning: Failed to lock RAM, so FIFO's memory may swap to disk.\n"
       "(You may want to use ulimit -l)\n");

    // Write something every 1024 bytes (4096 should be OK, actually).
    // Hopefully all pages are in real RAM after this. Better than nothing.

    for (i=0; i<size; i+=1024)
      buf[i] = 0;
  }

  fifo->size = size;

  return 0; // Success
}

void fifo_done(struct xillyfifo *fifo) {
  fifo->done = 1;
  sem_post(&fifo->read_sem);
  sem_post(&fifo->write_sem);
}

void fifo_destroy(struct xillyfifo *fifo) {
  if (!fifo->baseaddr)
    return; // Better safe than SEGV

  munlock(fifo->baseaddr, fifo->size);
  free(fifo->baseaddr);
 
  sem_destroy(&fifo->read_sem);
  sem_destroy(&fifo->write_sem);

  fifo->baseaddr = NULL;
}

int fifo_request_drain(struct xillyfifo *fifo,
             struct xillyinfo *info) {
  int taken = 0;
  unsigned int now_bytes, max_bytes;

  info->slept = 0;
  info->addr = NULL;

  now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);

  while (now_bytes == 0) {
    if (fifo->done)
      goto fail; // FIFO will not be used by other side, and is empty

    // fifo_wrote() updates bytes_in_fifo and then increments semaphore,
    // so there's no chance for oversleeping. On the other hand, it's
    // possible that the data was drained between the bytes_in_fifo
    // update and the semaphore increment, leading to a false wakeup.
    // That's why we're in a while loop ( + other race conditions).
   
    info->slept = 1;

    if (sem_wait(&fifo->read_sem) && (errno != EINTR))
      goto fail;

    now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);
  }

  max_bytes = fifo->size - fifo->read_position;
  taken = (now_bytes < max_bytes) ? now_bytes : max_bytes;
  info->addr = fifo->baseaddr + fifo->read_position;

 fail:
  info->bytes = taken;
  info->position = fifo->read_position;

  return taken;
}

void fifo_drained(struct xillyfifo *fifo,
       unsigned int req_bytes) {

  int semval;

  if (req_bytes == 0)
    return;

  __sync_sub_and_fetch(&fifo->bytes_in_fifo, req_bytes);
  __sync_add_and_fetch(&fifo->read_total, req_bytes);
 
  fifo->read_position += req_bytes;

  if (fifo->read_position >= fifo->size)
    fifo->read_position -= fifo->size;

  if (sem_getvalue(&fifo->write_sem, &semval))
    semval = 1; // This fallback should never happen

  // Don't increment the semaphore if it's nonzero anyhow. The possible
  // race condition between reading and possibly incrementing has no effect.

  if (semval == 0)
    sem_post(&fifo->write_sem);
}

int fifo_request_write(struct xillyfifo *fifo,
             struct xillyinfo *info) {
  int taken = 0;
  unsigned int now_bytes, max_bytes;

  info->slept = 0;
  info->addr = NULL;

  now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);

  if (fifo->done)
    goto fail; // No point filling an abandoned FIFO

  while (now_bytes >= (fifo->size - FIFO_BACKOFF)) {
    // fifo_drained() updates bytes_in_fifo and then increments semaphore,
    // so there's no chance for oversleeping. On the other hand, it's
    // possible that the data was written between the bytes_in_fifo
    // update and the semaphore increment, leading to a false wakeup.
    // That's why we're in a while loop ( + other race conditions).
    // Two wakeup conditions, 1) FIFO is not full   2)read/drain finishes    After drain finishes, write thread will wakeup.  However before wakeup, FIFO is full again,so this is a false wakeup.  This while loop guarantees write thread continues to sleep when false wakeup occurs.
    info->slept = 1;

    if (sem_wait(&fifo->write_sem) && (errno != EINTR))  // with the while loop and read_sem had not incremented semaphore, write thread goes into sem_wait() and continues to sleep (returns 'taken' as 0).
      goto fail;
 
    if (fifo->done)
      goto fail; // No point filling an abandoned FIFO

    now_bytes = __sync_add_and_fetch(&fifo->bytes_in_fifo, 0);
  }

  taken = fifo->size - (now_bytes + FIFO_BACKOFF);

  max_bytes = fifo->size - fifo->write_position;

  if (taken > max_bytes)
    taken = max_bytes;
  info->addr = fifo->baseaddr + fifo->write_position;

 fail:
  info->bytes = taken;
  info->position = fifo->write_position;

  return taken;
}

void fifo_wrote(struct xillyfifo *fifo,
       unsigned int req_bytes) {

  int semval;

  if (req_bytes == 0)
    return;

  __sync_add_and_fetch(&fifo->bytes_in_fifo, req_bytes);
  __sync_add_and_fetch(&fifo->write_total, req_bytes);
 
  fifo->write_position += req_bytes;
 
  if (fifo->write_position >= fifo->size)
    fifo->write_position -= fifo->size;
 
  if (sem_getvalue(&fifo->read_sem, &semval))
    semval = 1; // This fallback should never happen

  // Don't increment the semaphore if it's nonzero anyhow. The possible
  // race condition between reading and possibly incrementing has no effect.

  if (semval == 0)
    sem_post(&fifo->read_sem);
}

/*********************************************************************
 *                                                                   *
 *                 A P P L I C A T I O N   C O D E                   *
 *                                                                   *
 *********************************************************************/

// Read from FIFO, write to standard output

void *write_thread(void *arg)
{
  struct xillyfifo *fifo = arg;
  int do_bytes, written_bytes;
  struct xillyinfo info;
  unsigned char *buf;

  do_bytes = fifo_request_drain(fifo, &info);

  while (do_bytes > 0) {
    do_bytes = fifo_request_drain(fifo, &info);

    if (do_bytes == 0)
      return NULL;

    for (buf = info.addr; do_bytes > 0;
    buf += written_bytes, do_bytes -= written_bytes) {

      written_bytes = write(write_fd, buf, do_bytes);

      if ((written_bytes < 0) && (errno != EINTR)) {
   perror("write() failed");
   return NULL;
      }

      if (written_bytes == 0) {
   fprintf(stderr, "Reached write EOF (?!)\n");
   fifo_done(fifo);
   return NULL;
      }

      if (written_bytes < 0) { // errno is EINTR
   written_bytes = 0;
   continue;
      }
     
      fifo_drained(fifo, written_bytes);
    }
  }
}

// Write to FIFO, read from standard output

void *read_thread(void *arg)
{
  struct xillyfifo *fifo = arg;
  int do_bytes, read_bytes;
  struct xillyinfo info;
  unsigned char *buf;

  do_bytes = fifo_request_write(fifo, &info);

  while (do_bytes > 0) {
    do_bytes = fifo_request_write(fifo, &info);

    if (do_bytes == 0)
      return NULL;

    for (buf = info.addr; do_bytes > 0;
    buf += read_bytes, do_bytes -= read_bytes) {

      read_bytes = read(read_fd, buf, do_bytes);

      if ((read_bytes < 0) && (errno != EINTR)) {
         perror("read() failed");
         return NULL;
      }

      if (read_bytes == 0) {
         // Reached EOF. Quit without complaining.
         fifo_done(fifo);
         return NULL;
      }

      if (read_bytes < 0) { // errno is EINTR
         read_bytes = 0;
         continue;
      }
     
      fifo_wrote(fifo, read_bytes);
    }
  }
}


void *status_thread(void *arg) {
  struct xillyfifo *fifo = arg;
 
  while (fifo->done < 2)
    fprintf(stderr, "%9d bytes in FIFO, %12ld read, %12ld written\r",
       __sync_add_and_fetch(&fifo->bytes_in_fifo, 0),
       __sync_add_and_fetch(&fifo->read_total, 0),
       __sync_add_and_fetch(&fifo->write_total, 0)      
       );
  return NULL;
}

int main(int argc, char *argv[]) {
  pthread_t tid[3];
  struct xillyfifo fifo;
  unsigned int fifo_size;

  if ((argc != 2) && (argc != 3)) {
    fprintf(stderr, "Usage: %s fifo_size [read-file]\n", argv[0]);
    exit(1);
  }

  fifo_size = atoi(argv[1]);

  if (fifo_size == 0) {
    fprintf(stderr, "Bad fifo_size argument %s\n", argv[1]);
    exit(1);
  }

  if (fifo_init(&fifo, fifo_size)) {
    perror("Failed to init");
    exit(1);
  }

  if (argc > 2) {
    read_fd = open(argv[2], O_RDONLY);
  }
  else {
    read_fd = open("/dev/xillybus_read_32", O_RDONLY);
  }

  if (read_fd < 0) {
    perror("Failed to open read file");
    exit(1);
  }

  if (argc > 2) {
    write_fd = open(argv[3], O_WRONLY);
  }
  else {
    write_fd = open("/dev/xillybus_write_32", O_WRONLY);
  }

  if (write_fd < 0) {
    perror("Failed to open write file");
    exit(1);
  }

  if (pthread_create(&tid[0], NULL, read_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  if (pthread_create(&tid[1], NULL, write_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  if (pthread_create(&tid[2], NULL, status_thread, &fifo)) {
    perror("Failed to create thread");
    exit(1);
  }

  /** Synch of threads in order to exit normally*/
  gettimeofday(&tv1, NULL);

  pthread_join(tid[0], NULL);
  pthread_join(tid[1], NULL);

  fifo.done = 2; // This is a hack for the status thread
  pthread_join(tid[2], NULL);

  gettimeofday(&tv2, NULL);
  printf("%f\n\r", (double)1000000*(tv2.tv_sec-tv1.tv_sec)+(tv2.tv_usec-tv1.tv_usec));

  fifo_destroy(&fifo);

  pthread_exit(NULL);

  return 0;
}

Top